package org.example.launch

import com.alibaba.fastjson.serializer.SerializerFeature
import com.alibaba.fastjson.{JSON, JSONObject}
import org.example.client.DbClient
import org.example.common.{Logging, Sparking}
import org.example.constant.{ApolloConst, RiskConst}
import org.example.dao._
import org.example.enums.{ExpressionEnum, WarningFieldEnum, WarningTypeEnum}
import org.example.input.RiskRule
import org.example.logic.{AlarmProduce, RiskSuppress}
import org.example.originUtils.EquipmentAlarmProduce
import org.example.utils._
import io.searchbox.core.Search
import org.apache.commons.collections.MapUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import redis.clients.jedis.Jedis
import scalikejdbc.SQL

import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Random


/**
 * 风险与报警生成，将报警数据和风险数据实时写入ES org.example.launch.RiskProduce
 */
object RiskProduce extends Sparking with Logging with Serializable {
  def main(args: Array[String]): Unit = {
    val ssc = new StreamingContext(conf, Seconds(20))
    val zkManager = ZkManager(ApolloConst.zkKafka)
    val kafkaParams = getKafkaParams(ApolloConst.bootstrap, "riskProduceGroup3.2")
    val offsets = zkManager.getBeginOffset(ApolloConst.GPS_TOPICS, "riskProduceGroup3.2")
    val inputStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](ApolloConst.GPS_TOPICS, kafkaParams, offsets))
    //获取风险规则信息并广播
    val risks = RiskRule.getRisks()
    var riskBc = ssc.sparkContext.broadcast(risks)
    //广播报警基本信息
    var baseWarnInfoBc = ssc.sparkContext.broadcast(RiskRule.getBaseWarn())
    //广播夜间时段
    var riskTimeDeployBc = ssc.sparkContext.broadcast(RiskRule.getRiskTimeDeploy())

    //广播redis连接池
    val redisSink: RedisSink = RedisSink()
    var redisBc = ssc.sparkContext.broadcast(redisSink)
    //更新redis中人、车、企基本信息（已录入数据）
    redisSink.usingRedis(redis => {
      redis.select(9)
      SearchInfo.updateBaseInfo(redis)
    })
    //更新监控的车辆信息
    RiskRule.updateVehicleNos(redisSink)

    val offsetRanges = new ArrayBuffer[OffsetRange]()
    inputStream.transform { rdd =>
      offsetRanges.clear()
      offsetRanges.append(rdd.asInstanceOf[HasOffsetRanges].offsetRanges: _*)
      rdd
    }.map(x => x.value()).foreachRDD(rdd => {
      //println("--------------------------新批次:"+ System.currentTimeMillis() + "----------------------------------")
      if (!rdd.isEmpty()) {
        try {
          // 每批数据更新报警基本信息
          baseWarnInfoBc.unpersist()
          baseWarnInfoBc = ssc.sparkContext.broadcast(RiskRule.getBaseWarn())
          // 每批数据更新风险规则信息
          riskBc.unpersist()
          riskBc = ssc.sparkContext.broadcast(RiskRule.getRisks())
          // 每批数据更新夜间时间字段
          riskTimeDeployBc.unpersist()
          riskTimeDeployBc = ssc.sparkContext.broadcast(RiskRule.getRiskTimeDeploy())
          //设备异常
          val now = new DateTime()
          val flagTime = now.toString("yyyy-MM-dd HH:mm:ss")
          val redis = redisBc.value
          redis.usingRedis { r =>
            r.select(9)
            //每小时检查一次设备异常情况
            if (flagTime.split(":")(1).toInt % 30 == 0) {
              val loginOutCheck = r.get("lastOnlineCheck")
              if (StringUtils.isEmpty(loginOutCheck) || "0".equals(loginOutCheck)) {
                r.set("lastOnlineCheck", "1")
                //GPS设备上线异常和智能设备上线异常处理,以及三天超时GPS设备未上线异常和智能设备上线异常处理
                smartEquipmentAlarm(flagTime, riskBc.value, baseWarnInfoBc.value, redisBc.value)
              }
            }
            else {
              r.set("lastOnlineCheck", "0")
            }
          }
        }
        //redis连接池关闭的情况重新建立连接池
        if (redisBc.value.pool.isClosed) {
          redisBc.unpersist()
          redisBc = ssc.sparkContext.broadcast(RedisSink())
        }
        //数据分类
        val rddClass = rdd.map { row =>
          val off: OffsetRange = offsetRanges(TaskContext.get.partitionId)
          if (off.topic.equals("UP_EXG_MSG_REAL_LOCATION")) {
            (0, row)
          } else if (off.topic.equals("UP_WARN_MSG_ADPT_INFO")) {
            (1, row)
          }
          else {
            (2, row)
          }
        }
        rddClass.cache()

        //筛选数据,车辆实时定位信息
        val locationRdd = rddClass.filter(x => x._1 == 0).map(x => x._2)
        //车辆报警数据
        val warnRdd = rddClass.filter(x => x._1 == 1).map(x => x._2)
        val rddVaule: RDD[(Int, String)] = rddClass.filter(x => x._1 == 0 || x._1 == 1)
        //处理数据--生成报警报警和风险
        handleLocation(rddVaule, redisBc, baseWarnInfoBc, riskTimeDeployBc, riskBc)
        //  warnData2Es(warnRdd)
        zkManager.saveEndOffset(offsetRanges, "riskProduceGroup3.2")
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }


  /**
   * 处理gps和智能报警设备信息
   *
   * @param rdd
   */
  def handleLocation(rdd: RDD[(Int,String)], redisBC: Broadcast[RedisSink], baseWarnInfoBc: Broadcast[Map[String, String]], riskTimeDeployBc: Broadcast[(Int, Int)], riskBc: Broadcast[Map[Long, RiskInfo]]): Unit = {

    rdd.foreachPartition(partition => {
      var esMap = mutable.HashMap[String, String]()
      val riskList = new ListBuffer[AlarmNumber]()
      val esList = new ListBuffer[(String, String)]()
      redisBC.value.usingRedis { jedis =>
        jedis.select(9)
        partition.foreach(rddclass => {
          val msgType: Int = rddclass._1
          var messageInfo: JSONObject = null
          try {
            messageInfo = JSON.parseObject(rddclass._2)
          } catch {
            case e: Exception => error(s"该条数据格式错误:${rddclass._2}")
          }
          msgType match {
            case 0 => {
              /*var isMonitor = "0"
            if (messageInfo.containsKey("vehicleNo") && messageInfo.containsKey("vehicleColor")) {
              val vehicleNoMonitor = messageInfo.getString("vehicleNo") + "#" + messageInfo.getString("vehicleColor")
              isMonitor = jedis.hget("monitorVehicle", vehicleNoMonitor)
            }*/
              //自定义类报警生成
//              println("自定义报警：" + messageInfo)
              AlarmProduce.customAlarmProduce (messageInfo, jedis, baseWarnInfoBc.value, riskTimeDeployBc.value, esList, riskList)
            }
            case 1 => {
              //智能设备类报警
//              println("智能报警：" + messageInfo)
              AlarmProduce.smartAlarmProduce(messageInfo, jedis, baseWarnInfoBc.value, esList, riskList)
              }
          }
        })
      }
      EsUtils.insertBulk("warninfo_index", "warninfo_type", esList)

      warn("riskList"+riskList.length)
      // 风险生成
      redisBC.value.usingRedis { redis =>
        redis.select(9)
        createRiskBulk(riskList, riskBc.value, redis, baseWarnInfoBc.value)
      }
    })
  }

  /**
   * GPS设备上线异常和智能设备上线异常处理
   *
   * @param dateTime
   * @param risks
   * @param baseWarn
   * @param jedis
   */
  def smartEquipmentAlarm(dateTime: String, risks: Map[Long, RiskInfo], baseWarn: Map[String, String], jedis: RedisSink): Unit = {

    //智能设备的最后上线时间
    val smartDatas = jedis.usingRedis(r => {
      r.select(9)
      r.hgetAll("lastOnlineSmartDay")
    })
    //gps设备的最后上线时间
    val gpsDatas = jedis.usingRedis(r => {
      r.select(9)
      r.hgetAll("lastOnlineGpsDay")
    })
    val riskList = new ListBuffer[AlarmNumber]()
    val warnList = new ListBuffer[(String, String)]()
    //智能设备报警
    if (baseWarn.contains(WarningTypeEnum.SMART_EQUIPMENT_ABNORMAL_ALARM.toString)) {
      EquipmentAlarmProduce.onlineAbnormalAlarm("lastOnlineSmartDay", dateTime, smartDatas, gpsDatas, baseWarn, WarningTypeEnum.SMART_EQUIPMENT_ABNORMAL_ALARM.toString, jedis, warnList, riskList)
    }
    //新增：智能设备三天未上线异常报警 （普货/危货/客运/出租车）
    if(baseWarn.contains(WarningTypeEnum.SMART_FOR_3DAYS_OF_EQUIPMENT_ABNORMAL_ALARM.toString))
      {
        EquipmentAlarmProduce.online3DayAbnormalAlarm("lastOnlineSmartDay",dateTime,smartDatas,gpsDatas,baseWarn,WarningTypeEnum.SMART_FOR_3DAYS_OF_EQUIPMENT_ABNORMAL_ALARM.toString,jedis,warnList,riskList)
      }
    //gps设备报警
    if (baseWarn.contains(WarningTypeEnum.DEVICE_GOESON_LINE_ABNORMALLY.toString)) {
      EquipmentAlarmProduce.onlineAbnormalAlarm("lastOnlineGpsDay", dateTime, gpsDatas, smartDatas, baseWarn, WarningTypeEnum.DEVICE_GOESON_LINE_ABNORMALLY.toString, jedis, warnList, riskList)
    }
    //新增：GPS设备三天未上线异常报警 （普货/危货/客运/出租车）
    if(baseWarn.contains(WarningTypeEnum.DEVICE_FOR_3DAYS_OF_GOESON_LINE_ABNORMALLY.toString))
      {
        EquipmentAlarmProduce.online3DayAbnormalAlarm("lastOnlineGpsDay",dateTime,gpsDatas,smartDatas,baseWarn,WarningTypeEnum.DEVICE_FOR_3DAYS_OF_GOESON_LINE_ABNORMALLY.toString,jedis,warnList,riskList)
      }
    //报警和风险的生成
    if (warnList.nonEmpty) {
      // 报警信息插入es
      EsUtils.insertBulk("warninfo_index", "warninfo_type", warnList)

      // 风险生成
      jedis.usingRedis(redis => {
        redis.select(9)
        createRiskBulk(riskList, risks, redis, baseWarn)
      })
    }
  }

  /**
   * 批量生成风险
   *
   * @param alarms
   * @param riskRules
   * @param redis
   */
  def createRiskBulk(alarms: Iterable[AlarmNumber],
                     riskRules: Map[Long, RiskInfo],
                     redis: Jedis,
                     baseWarnInfo: Map[String, String]): Unit = {
    if (alarms.nonEmpty) {
      val genRisks = new ListBuffer[RiskGenerate]()
      alarms.foreach { alarm =>
        //当前风险规则
        val carCurrentWarn = alarm.currentWarn
        val warnTypeCodes = alarm.warnTypeCodes
        val warnTime = alarm.warnTime
        //车辆使用性质
        val useNature = alarm.useNature
        //车辆管控类型
        val controlType = alarm.controlType
        //车辆周期
        val tripStart = alarm.tripStart
        for ((id, riskInfo) <- riskRules) {
          /*//判断风险是否适用于这个车辆管控类型
          val vehicleTypes = riskInfo.vehicleTypes
          if (vehicleTypes.nonEmpty && (StringUtils.isEmpty(controlType) || vehicleTypes.get.contains(controlType)) && riskInfo.riskRules.nonEmpty) {*/
          //判断风险是否适用于这个车辆使用性质V2.3.1
          val useNatures = riskInfo.useNatures
          if (useNatures.nonEmpty && (StringUtils.isEmpty(useNature) || useNatures.get.contains(useNature)) && riskInfo.riskRules.nonEmpty) {
            //有无短时高频判断需求:1是、0否
            val frequency = riskInfo.isShortTimeHighFrequency
            //算法周期
            val algorithmCycle = riskInfo.algorithmCycle.getOrElse(0)
            //各风险等级的规则
            val rules = riskInfo.riskRules.get
            //风险等级编码
            var level = "0"
            //是否追加报警记录
            var isAddAlarm = "0"
            var riskGradeWarnings = new mutable.HashMap[String, Iterable[AlarmEvidence]]()
            //风险规则id
            var ruleId = 0L
            rules.foreach { rule =>
              if (rule.riskRules.nonEmpty) {
                val warnings = new mutable.HashMap[String, Iterable[AlarmEvidence]]()
                //风险下的报警规则
                val warns = rule.riskRules.get
                var count = 0
                var warnTypeExist = false
                warns.foreach { warn =>
                  try {
                    WarningFieldEnum.withName(warn.warningFieldCode) match {
                      //报警次数
                      case WarningFieldEnum.WARN_TIMES => {
                        var tuple = (count, warnTypeExist)
                        //存在算法周期时，判断在算法周期内是否满足风险生成条件
                        if (frequency && algorithmCycle > 0) {
                          tuple = warnAlgorithmCondition(warn, carCurrentWarn, warnTypeCodes, algorithmCycle, warnings, count, warnTypeExist, false)
                        } else {
                          //判断在行驶周期内是否满足风险生成条件
                          tuple = warnCondition(warn, carCurrentWarn, warnTypeCodes, warnings, count, warnTypeExist, false)
                        }
                        count = tuple._1
                        warnTypeExist = tuple._2
                      }
                      //连续报警次数
                      case WarningFieldEnum.CONTINUOUS_WARN_TIMES => {
                        var tuple = (count, warnTypeExist)
                        //存在算法周期时，判断在算法周期内是否满足风险生成条件
                        if (frequency && algorithmCycle > 0) {
                          tuple = warnAlgorithmCondition(warn, carCurrentWarn, warnTypeCodes, algorithmCycle, warnings, count, warnTypeExist, true)
                        } else {
                          //判断在行驶周期内是否满足风险生成条件
                          tuple = warnCondition(warn, carCurrentWarn, warnTypeCodes, warnings, count, warnTypeExist, true)
                        }
                        count = tuple._1
                        warnTypeExist = tuple._2
                      }
                      //是否下坡
                      case WarningFieldEnum.IS_DOWN_HILL => error(s"暂不支持的报警数据字段${warn.warningFieldCode}")
                      //连续驾驶时长
                      case WarningFieldEnum.CONTINUOUS_DRIVING_TIME => {
                        val tuple = drivingCondition(warn, carCurrentWarn, warnTypeCodes, warnings, count, warnTypeExist)
                        count = tuple._1
                        warnTypeExist = tuple._2
                      }
                      //是否追加报警记录
                      case WarningFieldEnum.IS_ADD_ALARM => {
                        count = count + 1
                        isAddAlarm = warn.fieldValue
                      }
                      //其他
                      case _ => error(s"暂不支持的报警数据字段${warn.warningFieldCode}")
                    }
                  } catch {
                    case e: NoSuchElementException => {
                      error(s"暂不支持的报警数据字段${warn.warningFieldCode}", e)
                    }
                  }
                }

                // 判断是否生成风险
                if (warns.size == count && warnTypeExist) {
                  if (rule.level >= level) {
                    level = rule.level
                    riskGradeWarnings = warnings
                    ruleId = rule.id
                  }
                }
              }
            }
            //风险等级大于1代表应该生成新的风险
            if (level.toInt > 0) {
              var warnTimes = warnTime
              if (warnTime.length == 10) {
                warnTimes = warnTime + "000"
              } else if (warnTime.length == 16) {
                warnTimes = (warnTime.toLong / 1000).toString
              }
              val riskCodeTime: String = AlarmProduce.getRiskCodeTime(warnTimes.toLong)
              //根据风险等级编码获取风险等级名称
              val riskGradeName = level match {
                case "1" => "较小风险"
                case "2" => "一般风险"
                case "3" => "较大风险"
                case "4" => "重大风险"
                case _ => "风险有误"
              }
              //风险详情描述
              var str = alarm.vehicleNo + "触发" + riskGradeName + ","
              //风险类型编码
              val riskTypeCode = id.toString
              //报警详情
              riskGradeWarnings.foreach { riskGradeWarning =>
                str += baseWarnInfo.getOrElse(riskGradeWarning._1, riskGradeWarning._1) + riskGradeWarning._2.size + "次,"
              }
              val riskDescription: String = str.substring(0, str.lastIndexOf(","))
              val riskId = alarm.vehicleNo + "_" + alarm.vehicleColor + "_" + tripStart + "_" + ruleId + "_" + level
              genRisks += (RiskGenerate(riskId, riskInfo.riskName, riskGradeName, level, riskDescription, riskTypeCode,
                alarm.vehicleNo, alarm.vehicleColor, alarm.speed, alarm.longitude,
                alarm.latitude, alarm.warnTime, riskCodeTime, riskGradeWarnings, isAddAlarm, tripStart))
            }
          }
        }
      }
      if (!genRisks.isEmpty) {
        //企业信息
        val enterprise: util.Map[String, String] = redis.hgetAll("enterprise")

        val risks = new ListBuffer[(String, String)]()
        //对需要追加报警信息的查询历史风险数据
        val historyRisk = queryAddAlarm(genRisks)
        //风险生成时间
        val generateTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
        genRisks.foreach { x =>
          //风险的id
          val id = x.id
          //风险编号
          val riskCode = CommonUtils.createRiskNumber
          //风险名称
          val riskName = x.riskName
          //风险等级名称
          val riskGradeName = x.riskGradeName
          //风险等级code
          val level = x.level
          //风险描述
          val riskDescription = x.riskDescription
          //风险编码
          val riskTypeCode = x.riskTypeCode
          //车牌号
          val vehicleNo = x.vehicleNo
          //车牌颜色
          val vehicleColor = x.vehicleColor
          //报警详情主键
          val primaryKey = vehicleNo + "#" + vehicleColor
          //车辆速度
          val speed = x.speed
          //经度
          val longitude = x.longitude
          //纬度
          val latitude = x.latitude
          //业务时间
          val warnTime = x.warnTime
          // 当前天
          val riskCodeTime = x.riskCodeTime
          //行驶速度
          var speed10 = ""
          if (!speed.isEmpty) {
            if (speed.contains("0x")) {
              val speeds = speed.replace("0x", "")
              speed10 = Integer.parseInt(speeds, 16).toString
            } else {
              speed10 = speed
            }
          }
          val driverInfo: util.Map[String, String] = redis.hgetAll("driverInfo:" + primaryKey)
          //最后一次报警纬度
          var lat = "-"
          if (!latitude.equals("-")) {
            val latFormat: java.lang.Double = latitude.toDouble
            lat = String.format("%.6f", latFormat)
          }
          //最后一次报警经度
          var lon = "-"
          if (!longitude.equals("-")) {
            val lonFormat: java.lang.Double = longitude.toDouble
            lon = String.format("%.6f", lonFormat)
          }
          var lastWarnAddress = ""
          val province = "贵州省"
          val city = "贵阳市"
          var area = ""
          var roadId = ""
          var roadName = ""

          var typeCodeNum = ""
          val random1 = new Random()
          val typeCodeList = List( "1", "2", "3", "4", "5")
          val lastWarnAddressMap1: mutable.Map[String, String] = mutable.Map("1" -> "贵阳绕城高速与兰海高速互通交叉口", "2" -> "贵阳绕城高速与银百高速交叉口", "3" -> "贵阳南环高速与厦蓉高速立交口", "4" -> "贵阳南环高速与花安高速立交口", "5" -> "贵阳绕城高速与沪昆高速立交口")
          val lastWarnAddressMap2: mutable.Map[String, String] = mutable.Map("1" -> "白云区", "2" -> "乌当区", "3" -> "南明区", "4" -> "花溪区", "5" -> "观山湖区")
          val lastWarnAddressMap3: mutable.Map[String, String] = mutable.Map("1" -> "021", "2" -> "022", "3" -> "023", "4" -> "024", "5" -> "025")
          val lastWarnAddressMap4: mutable.Map[String, String] = mutable.Map("1" -> "贵阳绕城高速", "2" -> "银百高速", "3" -> "贵阳南环高速", "4" -> "花安高速", "5" -> "沪昆高速")
          val typeCodeListIndex: Int = random1.nextInt(typeCodeList.size)
          typeCodeNum = typeCodeList(typeCodeListIndex)
          lastWarnAddress = lastWarnAddressMap1(typeCodeNum)
          area = lastWarnAddressMap2(typeCodeNum)
          roadId = lastWarnAddressMap3(typeCodeNum)
          roadName = lastWarnAddressMap4(typeCodeNum)
          //根据经纬度获取最后一次报警地点、省份、城市、区域
//          if (StringUtils.isNotEmpty(longitude) && StringUtils.isNotEmpty(latitude) && !longitude.equals("-") && !latitude.equals("-")) {
//            try {
//              val add: (String, String, String, String) = GeoCodeUtil.getAddressAndRoad(longitude.toDouble, latitude.toDouble)
//              lastWarnAddress = add._1
//              val adcode: String = add._2
//              roadId = add._3
//              roadName = add._4
//              if (!adcode.isEmpty) {
//                province = adcode.substring(0, 2)
//                city = adcode.substring(2, 4)
//                area = adcode.substring(4, 6)
//              }
//            } catch {
//              case e: Exception => {
//                error("经纬度获取错误!", e)
//              }
//            }
//          }

          //获取终端识别驾驶人信息
          /*val currDriverInfo = getCurrDriver(driverInfo, x.tripStart)
          //驾驶员姓名
          val driverName = currDriverInfo._1
          //驾驶员从业资格证号
          val licenseNumber = currDriverInfo._2*/
          val driverCurrInfo: DriverInfo = getDriverBaseInfo(vehicleNo)//配合演示环境，临时手动查询基础表匹配
          //驾驶员姓名
          val driverName = driverCurrInfo.name
          //驾驶员从业资格证号
          val licenseNumber = driverCurrInfo.license_number
          val personInfo: util.Map[String, String] = redis.hgetAll("licenseNumber:" + licenseNumber)
          //驾驶员所属公司编码
          val driverEnterpriseCode = personInfo.get("driverEnterprise")
          //驾驶员所属公司
          val driverEnterprise = enterprise.get(driverEnterpriseCode)
          //司机联系方式
          val contactInfo = personInfo.get("telephone")

          //获取最新的ic卡驾驶员信息
          //val signDriverInfo = getSignCurrDriver(primaryKey, warnTime, redis)
          /*//登签驾驶员姓名
          val cardDriverName = signDriverInfo._1
          //登签驾驶员从业资格证号
          val cardDriverLicence = signDriverInfo._2*/
          //登签驾驶员姓名
          val cardDriverName = driverCurrInfo.name
          //登签驾驶员从业资格证号
          val cardDriverLicence = driverCurrInfo.qualification_certificate_number
          //登签驾驶员所属企业编号
          var cardDriverEnterpriseCode = ""
          //登签驾驶员所属企业名称
          var cardDriverEnterprise = ""
          //登签驾驶员联系方式
          var cardContactInfo = ""
          if (StringUtils.isNotEmpty(cardDriverLicence)) {
            val cardDriverInfo: util.Map[String, String] = redis.hgetAll("licenseNumber:" + cardDriverLicence)
            //驾驶员所属公司编码
            cardDriverEnterpriseCode = cardDriverInfo.get("driverEnterprise")
            //驾驶员所属公司
            cardDriverEnterprise = enterprise.get(cardDriverEnterpriseCode)
            //司机联系方式
            cardContactInfo = cardDriverInfo.get("telephone")
          }

          //车辆所属公司编码
          var vehicleEnterpriseCode = driverInfo.get("vehicleEnterprise")
          //营运性质
          var useNature = driverInfo.get("useNature")
          if (StringUtils.isEmpty(vehicleEnterpriseCode)) {
            val vehicleResult = SearchInfo.getenterpriseByPlate(vehicleNo, vehicleColor)
            vehicleEnterpriseCode = vehicleResult._1
            useNature = vehicleResult._2
          }
          //车辆所属公司
          val vehicleEnterprise = enterprise.get(vehicleEnterpriseCode)

          //连续行驶时长
          var continuousTime = "-"
          val runTimes: String = redis.hget("more4hours", primaryKey)
          if (runTimes != null) {
            val runTime: Array[String] = runTimes.split("\\|")
            val seconds: Int = runTime(0).toInt
            if (seconds < 0) {
              continuousTime = seconds + "秒"
            } else if (seconds >= 60 && seconds < 3600) {
              continuousTime = seconds / 60 + "分"
            } else if (seconds >= 3600 && seconds < 86400) {
              val m = seconds / 60
              continuousTime = m / 60 + "时" + m % 60 + "分"
            }
          }

          val warnList = x.warns.flatMap { x => x._2 }.map(x => WarnInfoIndex(x.id, null, null, null, null, null,
            null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)).toList

          if (historyRisk.contains(id)) {
            val riskDetailIndex = historyRisk.get(id)
            riskDetailIndex.foreach { riskDetail =>
              val oldRiskCode = riskDetail.riskCode
              val riskDate: String = warnTime
              val lastWarnDate: String = new DateTime(riskDate.toLong).toString("yyyy-MM-dd HH:mm:ss")
//              var processTypeCode = ""
//              var processUserName = ""
//              val random = new Random()
//              val processTypeCodeList = List( "1", "2", "3", "6", "5")
//              val processUserNameMap3: mutable.Map[String, String] = mutable.Map("1" -> "李元韬", "2" -> "李无双", "3" -> "徐超", "6" -> "张松", "5" -> "王方帅")
//              val processUserNameMap5: mutable.Map[String, String] = mutable.Map("1" -> "夏雨涵", "2" -> "梅冰冰", "3" -> "汤鹏成", "6" -> "钟航", "5" -> "钟航")
//              val processUserNameMap6: mutable.Map[String, String] = mutable.Map("1" -> "何喜月", "2" -> "周杰", "3" -> "孙大超", "6" -> "赵涛", "5" -> "赵涛")
//              val typeCodeListIndex: Int = random.nextInt(processTypeCodeList.size)
//              if(vehicleEnterpriseCode.equals("202109090003")){
//                processTypeCode = processTypeCodeList(typeCodeListIndex)
//                processUserName = processUserNameMap3(processTypeCode)
//              }else if(vehicleEnterpriseCode.equals("202109090005")){
//                processTypeCode = processTypeCodeList(typeCodeListIndex)
//                processUserName = processUserNameMap5(processTypeCode)
//              }else if(vehicleEnterpriseCode.equals("202109090006")){
//                processTypeCode = processTypeCodeList(typeCodeListIndex)
//                processUserName = processUserNameMap6(processTypeCode)
//              }
//              riskDetail.setProcessUserName(processUserName)
              riskDetail.setRiskDescription(riskDescription)
              riskDetail.setSpeed(speed10)
              riskDetail.setLongitude(longitude)
              riskDetail.setLatitude(latitude)
              riskDetail.setLastWarnAddress(lastWarnAddress)
              riskDetail.setLastWarnDate(lastWarnDate)
              riskDetail.setProvince(province)
              riskDetail.setCity(city)
              riskDetail.setArea(area)
              riskDetail.setVehicleEnterpriseCode(vehicleEnterpriseCode)
              riskDetail.setVehicleEnterprise(vehicleEnterprise)
              riskDetail.setUseNature(useNature)
              riskDetail.setContinuousTime(continuousTime)
              riskDetail.setCardDriverName(cardDriverName)
              riskDetail.setCardDriverLicence(cardDriverLicence)
              riskDetail.setCardDriverEnterpriseCode(cardDriverEnterpriseCode)
              riskDetail.setCardDriverEnterprise(cardDriverEnterprise)
              riskDetail.setCardContactInfo(cardContactInfo)
              riskDetail.setWarnList(warnList.asJava)
              if (RiskConst.PROCESS_STATUS_UNTREATED.equals(riskDetail.processStatus)) {
                riskDetail.setDriverName(driverName)
                riskDetail.setDriverCode(licenseNumber)
                riskDetail.setDriverEnterpriseCode(driverEnterpriseCode)
                riskDetail.setDriverEnterprise(driverEnterprise)
                riskDetail.setContactInfo(contactInfo)
              }
              val riskInfo = JSON.toJSONString(riskDetail, SerializerFeature.QuoteFieldNames)
              risks += ((oldRiskCode, riskInfo))
              warn(s"往风险${oldRiskCode}里面追加一条报警数据${warnList.last.id}")
            }
          } else {
            //生成新的风险
            val riskDate: String = warnTime
            val lastWarnDate: String = new DateTime(riskDate.toLong).toString("yyyy-MM-dd HH:mm:ss")
            //判断当前风险点是否处于抑制状态
            val key = "riskSuppress:" + vehicleNo + "_" + vehicleColor + "_" + riskTypeCode + "_" + level
            val isSuppress = RiskSuppress.isRiskSuppress(key, redis)
//            var processStatus = RiskConst.PROCESS_STATUS_UNTREATED//满足演示环境风险处理状态，风险生成时就随机生成处理与未处理

            var realFlag = ""
            var holdFlag = ""

            // TODO:  生产环境需要删除
            var processStatus = ""
            val random = new Random()
            val statusList = List("0", "0", "0", "0", "0", "10", "10", "10", "10", "10", "10", "10", "10", "10", "10", "11", "11")//0-待处理 10-已处理 11-暂不处理  2-已逾期
            val num = random.nextInt(statusList.size)
            val riskStatus = statusList(num)
            processStatus = riskStatus

            // TODO:  生产环境需要删除
            var processDate = ""
            var processTypeCode = ""
            var processName = ""
            var processDescription = ""
            var suppressEndtime = ""
            var receiveDate = ""
            var processUserName = ""
            val timeList = List( 3, 8, 13, 18, 23, 28, 33)
            val processTypeCodeList = List( "1", "2", "3", "6", "5")
            val processNameMap: mutable.Map[String, String] = mutable.Map("1" -> "语音对讲", "2" -> "电话通知", "3" -> "短信通知", "6" -> "回场处理", "5" -> "无需处理")
            val processUserNameMap3: mutable.Map[String, String] = mutable.Map("1" -> "李元韬", "2" -> "李无双", "3" -> "徐超", "6" -> "张松", "5" -> "王方帅")
            val processUserNameMap5: mutable.Map[String, String] = mutable.Map("1" -> "夏雨涵", "2" -> "梅冰冰", "3" -> "汤鹏成", "6" -> "钟航", "5" -> "钟航")
            val processUserNameMap6: mutable.Map[String, String] = mutable.Map("1" -> "何喜月", "2" -> "周杰", "3" -> "孙大超", "6" -> "赵涛", "5" -> "赵涛")
            val timeListIndex: Int = random.nextInt(timeList.size)
            val typeCodeListIndex: Int = random.nextInt(processTypeCodeList.size)
            val processCommentList = new util.ArrayList[ProcessComment]
            if ( processStatus == "10" ) {
              processDate = new DateTime(riskDate.toLong + timeList(timeListIndex) * 60 * 1000).toString("yyyy-MM-dd HH:mm:ss")
              receiveDate = (riskDate.toLong + timeList(timeListIndex) * 60 * 1000 - 150 * 1000).toString
              processTypeCode = processTypeCodeList(typeCodeListIndex)
              processUserName = processUserNameMap3(processTypeCode)
              if (processTypeCode == "5") {
                processName = processNameMap(processTypeCode)
                processDescription = "备注" + processName
                val processContent: ProcessComment = getRiskProcessComment(riskTypeCode, "2").head//获取风险处理备注
                processCommentList.add(processContent)
                realFlag = "0"
              } else {
                processName = processNameMap(processTypeCode)
                processDescription = "备注" + processName
                val processContent = getRiskProcessComment(riskTypeCode, "1").head//获取风险处理备注
                processCommentList.add(processContent)
                realFlag = "1"
              }
              if(vehicleEnterpriseCode.equals("202109090003")){
                processTypeCode = processTypeCodeList(typeCodeListIndex)
                processUserName = processUserNameMap3(processTypeCode)
              }else if(vehicleEnterpriseCode.equals("202109090005")){
                processTypeCode = processTypeCodeList(typeCodeListIndex)
                processUserName = processUserNameMap5(processTypeCode)
              }else if(vehicleEnterpriseCode.equals("202109090006")){
                processTypeCode = processTypeCodeList(typeCodeListIndex)
                processUserName = processUserNameMap6(processTypeCode)
              }
            } else if( processStatus == "11") {
              processDate = new DateTime(riskDate.toLong + timeList(timeListIndex) * 60 * 1000).toString("yyyy-MM-dd HH:mm:ss")
              receiveDate = (riskDate.toLong + timeList(timeListIndex) * 60 * 1000 - 150 * 1000).toString
              suppressEndtime = new DateTime(riskDate.toLong + (28 + timeList(timeListIndex)) * 60 * 1000).toString("yyyy-MM-dd HH:mm:ss")//风险一直时间比处理时间多28min
              processTypeCode = "4"
              processName = "抑制"
              processDescription = "备注" + processName
              processUserName = "赵钱孙"
              realFlag = RiskConst.RISK_STATUS_VALID
              holdFlag = RiskConst.RISK_STATUS_HOLD
              var processContent = ProcessComment("", "")
              try {
                processContent = getRiskProcessComment(riskTypeCode, "3").head//获取风险处理备注
              }catch {
                case _ => processContent = ProcessComment("1", "小型车")//获取风险处理备注
              }
              processCommentList.add(processContent)
            }

            // TODO: 生产去掉注释
            /*var realFlag = ""
            var holdFlag = ""
            //添加抑制状态
            if (isSuppress) {
              processStatus = RiskConst.PROCESS_STATUS_SUPPRESS
              realFlag = RiskConst.RISK_STATUS_VALID
              holdFlag = RiskConst.RISK_STATUS_HOLD
            }*/

            //processTypeCode, processName, processDescription, processDate, suppressEndtime, receiveDate,processCommentList原都为null,此处是配合演示造数据
            val riskDetailIndex = RiskDetailIndex(id, riskCode, lastWarnDate, processStatus,
              riskTypeCode, riskName, level, riskGradeName, riskDate,
              riskDescription, lastWarnAddress, province, city, area,
              lon, lat, vehicleNo, vehicleColor, vehicleEnterprise,
              vehicleEnterpriseCode, driverName, licenseNumber, driverEnterprise,
              driverEnterpriseCode, useNature, contactInfo, continuousTime,
              speed10, warnList.asJava,
              if(processStatus =="11") suppressEndtime else null,
              processTypeCode, processName, processDescription,
              if(processStatus == "0") null else processDate,
              null, processUserName, realFlag, holdFlag, null, null,
              null, null, null, null,
              if(processStatus == "0") null else receiveDate,
              null, cardDriverName,
              cardDriverLicence, cardDriverEnterpriseCode, cardDriverEnterprise,
              cardContactInfo, roadId, roadName, generateTime, null, processCommentList)

            val riskInfo = JSON.toJSONString(riskDetailIndex, SerializerFeature.QuoteFieldNames)
            risks += ((riskCode, riskInfo))
            warn(s"生成一条新的风险${riskCode}")
          }
        }
        //        if(risks.size>0){
        //          redis.hset("riskProduce","listSize"+"|"+(new DateTime()),risks.toString()+"|"+risks.size)
        //        }
        EsUtils.insertBulk("risk_detail_index", "_doc", risks)
      }

    }
  }

  /**
   * 小于
   *
   * @param warnCount
   * @param fieldValue
   * @return
   */
  def lessThan(warnCount: Int, fieldValue: String): Boolean = {
    warnCount < fieldValue.toInt
  }

  /**
   * 等于
   *
   * @param warnCount
   * @param fieldValue
   * @return
   */
  def equals(warnCount: Int, fieldValue: String): Boolean = {
    warnCount == fieldValue.toInt
  }

  /**
   * 小于等于
   *
   * @param warnCount
   * @param fieldValue
   * @return
   */
  def noMoreThan(warnCount: Int, fieldValue: String): Boolean = {
    warnCount <= fieldValue.toInt
  }

  /**
   * 大于
   *
   * @param warnCount
   * @param fieldValue
   * @return
   */
  def moreThan(warnCount: Int, fieldValue: String): Boolean = {
    warnCount > fieldValue.toInt
  }

  /**
   * 大于等于
   *
   * @param warnCount
   * @param fieldValue
   * @return
   */
  def noLessThan(warnCount: Int, fieldValue: String): Boolean = {
    warnCount >= fieldValue.toInt
  }

  /**
   * 介于between
   *
   * @param warnCount
   * @param fieldValue
   * @return
   */
  def between(warnCount: Int, fieldValue: String): Boolean = {
    val values = fieldValue.split(",")
    warnCount > values(0).toInt && warnCount <= values(1).toInt
  }

  /**
   * 校验是否达到算法周期内风险生成条件
   *
   * @param warn
   * @param carCurrentWarn
   * @param warnTypeCodes
   * @param _count
   * @param _warnTypeExist
   * @return
   */
  def warnAlgorithmCondition(warn: RiskRuleWarning,
                             carCurrentWarn: util.Map[String, String],
                             warnTypeCodes: List[String],
                             algorithmCycle: Int,
                             warns: mutable.HashMap[String, Iterable[AlarmEvidence]],
                             _count: Int,
                             _warnTypeExist: Boolean,
                             isContinuous: Boolean): (Int, Boolean) = {
    var count = _count
    var warnTypeExist = _warnTypeExist
    try {
      ExpressionEnum.withName(warn.expressionCode) match {
        case ExpressionEnum.EXPRESSION_LESSTHAN => {
          if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
            val warnCount = algorithmCycleCount(carCurrentWarn, algorithmCycle, warn.warningTypeCode, warns, isContinuous)
            if (warn.fieldValue.toInt < warnCount) {
              count += 1
            }
          }
          if (warnTypeCodes.contains(warn.warningTypeCode)) {
            warnTypeExist = true
          }
        }
        case ExpressionEnum.EXPRESSION_BETWEEN => {
          if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
            val warnCount = algorithmCycleCount(carCurrentWarn, algorithmCycle, warn.warningTypeCode, warns, isContinuous)
            val values = warn.fieldValue.split(",")
            if (warnCount > values(0).toInt && warnCount <= values(1).toInt) {
              count += 1
            }
          }
          if (warnTypeCodes.contains(warn.warningTypeCode)) {
            warnTypeExist = true
          }
        }
        case ExpressionEnum.EXPRESSION_EQUALS => {
          if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
            val warnCount = algorithmCycleCount(carCurrentWarn, algorithmCycle, warn.warningTypeCode, warns, isContinuous)
            if (warnCount == warn.fieldValue.toInt) {
              count += 1
            }
          }
          if (warnTypeCodes.contains(warn.warningTypeCode)) {
            warnTypeExist = true
          }
        }
        case ExpressionEnum.EXPRESSION_MORETHAN => {
          if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
            val warnCount = algorithmCycleCount(carCurrentWarn, algorithmCycle, warn.warningTypeCode, warns, isContinuous)
            if (warnCount > warn.fieldValue.toInt) {
              count += 1
            }
          }
          if (warnTypeCodes.contains(warn.warningTypeCode)) {
            warnTypeExist = true
          }
        }
        case ExpressionEnum.EXPRESSION_NOLESSTHAN => {
          if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
            val warnCount = algorithmCycleCount(carCurrentWarn, algorithmCycle, warn.warningTypeCode, warns, isContinuous)
            if (warnCount >= warn.fieldValue.toInt) {
              count += 1
            }
          }
          if (warnTypeCodes.contains(warn.warningTypeCode)) {
            warnTypeExist = true
          }
        }
        case ExpressionEnum.EXPRESSION_NOMORETHAN => {
          if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
            val warnCount = algorithmCycleCount(carCurrentWarn, algorithmCycle, warn.warningTypeCode, warns, isContinuous)
            if (warnCount <= warn.fieldValue.toInt) {
              count += 1
            }
          }
          if (warnTypeCodes.contains(warn.warningTypeCode)) {
            warnTypeExist = true
          }
        }
        case _ => error(s"未匹配到表达式${warn.expressionCode}")
      }
    } catch {
      case e: NoSuchElementException => {
        error(s"未匹配到表达式${warn.expressionCode}", e)
      }
    }
    (count, warnTypeExist)
  }

  /**
   * 校验是否达到风险生成条件
   *
   * @param warn
   * @param carCurrentWarn
   * @param warnTypeCodes
   * @param warns
   * @param _count
   * @param _warnTypeExist
   * @param isContinuous
   * @return
   */
  def warnCondition(warn: RiskRuleWarning,
                    carCurrentWarn: util.Map[String, String],
                    warnTypeCodes: List[String],
                    warns: mutable.HashMap[String, Iterable[AlarmEvidence]],
                    _count: Int,
                    _warnTypeExist: Boolean,
                    isContinuous: Boolean): (Int, Boolean) = {
    var result = (_count, _warnTypeExist)
    try {
      ExpressionEnum.withName(warn.expressionCode) match {
        case ExpressionEnum.EXPRESSION_LESSTHAN => {
          result = drivingCycleCount(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, isContinuous, lessThan)
        }
        case ExpressionEnum.EXPRESSION_BETWEEN => {
          result = drivingCycleCount(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, isContinuous, between)
        }
        case ExpressionEnum.EXPRESSION_EQUALS => {
          result = drivingCycleCount(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, isContinuous, equals)
        }
        case ExpressionEnum.EXPRESSION_MORETHAN => {
          result = drivingCycleCount(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, isContinuous, moreThan)
        }
        case ExpressionEnum.EXPRESSION_NOLESSTHAN => {
          result = drivingCycleCount(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, isContinuous, noLessThan)
        }
        case ExpressionEnum.EXPRESSION_NOMORETHAN => {
          result = drivingCycleCount(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, isContinuous, noMoreThan)
        }
        case _ => error(s"未匹配到表达式${warn.expressionCode}")
      }
    } catch {
      case e: NoSuchElementException => {
        error(s"未匹配到表达式${warn.expressionCode}", e)
      }
    }
    result
  }

  /**
   * 连续驾驶时长判断是否满足要求
   *
   * @param warn
   * @param carCurrentWarn
   * @param warnTypeCodes
   * @param warns
   * @param _count
   * @param _warnTypeExist
   * @param fun
   * @return
   */
  def drivingCycleContinuous(warn: RiskRuleWarning,
                             carCurrentWarn: util.Map[String, String],
                             warnTypeCodes: List[String],
                             warns: mutable.HashMap[String, Iterable[AlarmEvidence]],
                             _count: Int,
                             _warnTypeExist: Boolean,
                             fun: (Int, String) => Boolean): (Int, Boolean) = {
    var count = _count
    var warnTypeExist = _warnTypeExist
    if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
      val evidences = carCurrentWarn.get(warn.warningTypeCode)
      var drivingTime = 0
      if (StringUtils.isNotEmpty(evidences)) {
        val evidenceList = JSON.parseArray(evidences, classOf[AlarmEvidence]).asScala
        val lastEvidence = evidenceList.last
        drivingTime = lastEvidence.drivingTime
        warns.put(warn.warningTypeCode, evidenceList.takeRight(1))
      }
      val fieldValue = warn.fieldValue.toInt * 3600
      if (fun(drivingTime, fieldValue.toString)) {
        count += 1
      }
    }
    if (warnTypeCodes.contains(warn.warningTypeCode)) {
      warnTypeExist = true
    }
    (count, warnTypeExist)
  }

  /**
   * 校验连续驾驶时长是否达到风险生成条件
   *
   * @param warn
   * @param carCurrentWarn
   * @param warnTypeCodes
   * @param warns
   * @param _count
   * @param _warnTypeExist
   * @return
   */
  def drivingCondition(warn: RiskRuleWarning,
                       carCurrentWarn: util.Map[String, String],
                       warnTypeCodes: List[String],
                       warns: mutable.HashMap[String, Iterable[AlarmEvidence]],
                       _count: Int,
                       _warnTypeExist: Boolean): (Int, Boolean) = {
    var result = (_count, _warnTypeExist)
    try {
      ExpressionEnum.withName(warn.expressionCode) match {
        case ExpressionEnum.EXPRESSION_LESSTHAN => {
          result = drivingCycleContinuous(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, lessThan)
        }
        case ExpressionEnum.EXPRESSION_BETWEEN => {
          result = drivingCycleContinuous(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, between)
        }
        case ExpressionEnum.EXPRESSION_EQUALS => {
          result = drivingCycleContinuous(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, equals)
        }
        case ExpressionEnum.EXPRESSION_MORETHAN => {
          result = drivingCycleContinuous(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, moreThan)
        }
        case ExpressionEnum.EXPRESSION_NOLESSTHAN => {
          result = drivingCycleContinuous(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, noLessThan)
        }
        case ExpressionEnum.EXPRESSION_NOMORETHAN => {
          result = drivingCycleContinuous(warn, carCurrentWarn, warnTypeCodes, warns, _count, _warnTypeExist, noMoreThan)
        }
        case _ => error(s"未匹配到表达式${warn.expressionCode}")
      }
    } catch {
      case e: NoSuchElementException => {
        error(s"未匹配到表达式${warn.expressionCode}", e)
      }
    }
    result
  }

  /**
   * 对需要追加报警信息的查询历史风险数据
   *
   * @param genRisks
   * @return
   */
  def queryAddAlarm(genRisks: ListBuffer[RiskGenerate]): mutable.HashMap[String, RiskDetailIndex] = {
    var riskInfoMaps = new mutable.HashMap[String, RiskDetailIndex]()
    val risks = genRisks.filter(x => "1".equals(x.isAddAlarm)).map(x => x.id)
    risks.grouped(500).foreach { risk =>
      val riskIds = risk.mkString("\"", "\",\"", "\"")
      val queryRiskTotal =
        s"""
           |{
           |  "_source": [
           |     "id"
           |  ],
           |  "query": {
           |    "bool": {
           |      "must": [{
           |          "terms": {
           |            "id.keyword": ["${riskIds}"]
           |          }
           |        }
           |      ]
           |    }
           |  }
           |}
           """.stripMargin
      val jestClient = EsUtils.getClient()
      val riskTotal = CommonUtils.getPageSize(queryRiskTotal, "risk_detail_index", "_doc", jestClient)

      val queryRisk =
        s"""
           |{
           |  "from": 0,
           |  "size": ${riskTotal},
           |  "query": {
           |     "bool": {
           |       "must": [{
           |          "terms": {
           |            "id.keyword": ["${riskIds}"]
           |          }
           |        }
           |      ]
           |    }
           |  }
           |}
           """.stripMargin
      val searchRisk: Search = new Search.Builder(queryRisk).addIndex("risk_detail_index").addType("_doc").build()
      val resultRisk = jestClient.execute(searchRisk).getSourceAsObjectList(classOf[RiskDetailIndex], false).asScala
      jestClient.close()
      val riskInfoMap = resultRisk.map(x => (x.id, x)).toMap
      riskInfoMaps ++= riskInfoMap
    }

    riskInfoMaps
  }

  /**
   * 获取当前驾驶人
   *
   * @param driverInfo
   * @param tripStart
   * @return
   */
  def getCurrDriver(driverInfo: util.Map[String, String], tripStart: String): (String, String) = {
    //驾驶员
    var driverName = ""
    //驾驶员从业证号
    var licenseNumber = ""
    //业务时间
    val businessTime = driverInfo.get("businessTime")
    //如果有登签数据后来，当成行驶周期结束
    if (StringUtils.isNotEmpty(businessTime)) {
      try {
        val businessMillis = DateTime.parse(businessTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).getMillis
        var tripStartTime = 0L
        if (tripStart.size == 8) {
          tripStartTime = DateTime.parse(tripStart, DateTimeFormat.forPattern("yyyyMMdd")).getMillis
        } else {
          tripStartTime = tripStart.toLong
        }
        if (businessMillis >= tripStartTime) {
          //驾驶员姓名
          driverName = driverInfo.get("driverName")
          //驾驶证号
          licenseNumber = driverInfo.get("licenseNumber")
        }
      } catch {
        case e: Exception => {
          error(s"业务日期格式不对:${businessTime},${tripStart}")
        }
      }
    }

    (driverName, licenseNumber)
  }

  /**
   * 获取登签当前驾驶人
   *
   * @param primaryKey
   * @param redis
   * @return
   */
  def getSignCurrDriver(primaryKey: String, warnTime: String, redis: Jedis): (String, String) = {
    //驾驶员登签数据
    val driverSign = redis.hgetAll("driverSign:" + primaryKey)
    //驾驶员
    var signDriverName = ""
    //驾驶员从业证号
    var signLicense = ""
    if (MapUtils.isNotEmpty(driverSign)) {
      //驾驶员打卡时间
      val signInTime = driverSign.get("time")
      //1-驾驶员上班; 2-下班
      val stats = driverSign.get("state")
      try {
        val signInMillis = DateTime.parse(signInTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).getMillis
        if ("1".equals(stats)) {
          if (warnTime.toLong >= signInMillis) {
            signDriverName = driverSign.get("driverName")
            signLicense = driverSign.get("licence")
          }
        } else if ("2".equals(stats)) {
          if (warnTime.toLong <= signInMillis) {
            signDriverName = driverSign.get("driverName")
            signLicense = driverSign.get("licence")
          }
        }
      } catch {
        case e: Exception => {
          error(s"登签数据日期格式不对:${signInTime}")
        }
      }

    }
    (signDriverName, signLicense)
  }

  /**
   * 统计算法周期内的报警次数
   *
   * @param carCurrentWarn
   * @param algorithmCycle
   * @param warningTypeCode
   * @return
   */
  def algorithmCycleCount(carCurrentWarn: util.Map[String, String],
                          algorithmCycle: Int, warningTypeCode: String,
                          warns: mutable.HashMap[String, Iterable[AlarmEvidence]],
                          isContinuous: Boolean): Int = {
    val evidences = carCurrentWarn.get(warningTypeCode)
    var warnCount = 0
    if (StringUtils.isNotEmpty(evidences)) {
      val evidenceList = JSON.parseArray(evidences, classOf[AlarmEvidence]).asScala
      //最新的一条报警信息
      val lastEvidence = evidenceList.last
      //最新的一条报警的报警时间
      val time = lastEvidence.time
      //最新的一条报警的报警id
      val id = lastEvidence.id
      val repeatEvidenceCount = evidenceList.count(x => id.equals(x.id))
      //最后一条和之前的不重复的情况
      if (repeatEvidenceCount == 1) {
        val evidenceFilter = evidenceList.filter { x =>
          val intervalTime = time.toLong - x.time.toLong
          if (intervalTime >= 0 && intervalTime < algorithmCycle * 60) {
            true
          } else {
            false
          }
        }
        if (evidenceFilter.nonEmpty) {
          warnCount = evidenceFilter.size
          //去除重复的证据
          val evidences = evidenceFilter.groupBy(x => x.id).map(x => x._2.head)
          if (isContinuous) {
            warnCount = warnCount / 2
          } else {
            warnCount = evidences.size
          }
          warns.put(warningTypeCode, evidences)
        }
      }
    }
    warnCount
  }

  /**
   *
   * @param warn
   * @param carCurrentWarn
   * @param warnTypeCodes
   * @param warns
   * @param _count
   * @param _warnTypeExist
   * @param isContinuous
   * @param fun
   * @return
   */
  def drivingCycleCount(warn: RiskRuleWarning,
                        carCurrentWarn: util.Map[String, String],
                        warnTypeCodes: List[String],
                        warns: mutable.HashMap[String, Iterable[AlarmEvidence]],
                        _count: Int,
                        _warnTypeExist: Boolean,
                        isContinuous: Boolean,
                        fun: (Int, String) => Boolean): (Int, Boolean) = {
    var count = _count
    var warnTypeExist = _warnTypeExist
    if (carCurrentWarn.containsKey(warn.warningTypeCode)) {
      val evidences = carCurrentWarn.get(warn.warningTypeCode)
      var warnCount = 0
      if (StringUtils.isNotEmpty(evidences)) {
        val evidenceList = JSON.parseArray(evidences, classOf[AlarmEvidence]).asScala
        //最新的一条报警信息
        val lastEvidence = evidenceList.last
        //最新的一条报警的报警id
        val id = lastEvidence.id
        val repeatEvidenceCount = evidenceList.count(x => id.equals(x.id))
        //最后一条和之前的不重复的情况
        if (repeatEvidenceCount == 1) {
          //去除重复的报警数据
          val evidenceLists = evidenceList.groupBy(x => x.id).map(x => x._2.head)
          warns.put(warn.warningTypeCode, evidenceLists)
          warnCount = evidenceList.size
          if (isContinuous) {
            warnCount = warnCount / 2
          } else {
            warnCount = evidenceLists.size
          }
        }
      }
      if (fun(warnCount, warn.fieldValue)) {
        count += 1
      }
    }
    if (warnTypeCodes.contains(warn.warningTypeCode)) {
      warnTypeExist = true
    }
    (count, warnTypeExist)
  }

  /**
   * @Description 手动为风险添加终端和登签驾驶人信息--演示环境造数据，临时
   * @Date  2021/9/29
   * @Param [vehicleNo]
   * @return DriverInfo
   **/
  def getDriverBaseInfo(vehicleNo:String):DriverInfo = {
    val driverInfoSql =
      """
        |select
        |	a.enterprise_code,
        | b.enterprise_name,
        | a.plate_num,
        | c.name,
        | c.license_number,
        | c.qualification_certificate_number
        |from zcov.basic_vehicle_info a
        |inner join zcov.basic_enterprise_info b on a.enterprise_code=b.enterprise_code
        |inner join zcov.basic_driver_info c on b.enterprise_code=c.enterprise_code
        |where a.status = 1 and a.deleted != 1 and a.plate_num = ? order by rand() limit 1
        |""".stripMargin

    DbClient.init("driverInfo",
      ApolloConst.jgdMysqlDriver, ApolloConst.jgdMysqlURL, ApolloConst.jgdMysqlUserName, ApolloConst.jgdMysqlPassWord)
    val driverInfo = DbClient.usingDB("driverInfo") {
      db => {
        db readOnly {
          implicit session => {
            SQL(driverInfoSql)
              .bind(vehicleNo)
              .map(rs => DriverInfo(rs.get[String](1), rs.get[String](2), rs.get[String](3), rs.get[String](4), rs.get[String](5), rs.get[String](6)))
              .list()
              .apply()
          }
        }
      }
    }
    if (null != driverInfo && !driverInfo.isEmpty) driverInfo.head else DriverInfo("202109020002","演示企业","渝C6K548","丁健","510822199008240016","510822199008240016")
  }

  /**
   * 获取风险处理备注信息--演示环境造数据，临时
   *
   * @return
   */
  def getRiskProcessComment(riskTypeCode: String, commentType: String): List[ProcessComment] = {
    DbClient.init("riskRule",
      ApolloConst.jgdMysqlDriver,
      ApolloConst.jgdMysqlURL,
      ApolloConst.jgdMysqlUserName,
      ApolloConst.jgdMysqlPassWord)
    val riskComment = DbClient.usingDB("riskRule") {
      db => {
        db readOnly {
          implicit session => {
            SQL("select id,content from zcov.config_risk_comment where parent_risk_id =? and comment_type =? order by rand() limit 1")
              .bind(riskTypeCode, commentType)
              .map { rs =>
                ProcessComment(rs.get("id"), rs.get("content"))
              }.list()
              .apply()
          }
        }
      }
    }
    riskComment
  }
}

case class DriverInfo(
                       enterprise_code :String,
                       enterprise_name :String,
                       plate_num :String,
                       name :String,
                       license_number :String,
                       qualification_certificate_number :String
                     )
